This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d3cd8d880 ARTEMIS-4136 Mirrored sync replica
0d3cd8d880 is described below

commit 0d3cd8d88031dbf78851969a56041e839f828bf7
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jan 18 16:58:39 2023 -0500

    ARTEMIS-4136 Mirrored sync replica
    
    I am adding an option sync=true or false on mirror. if sync, any client 
blocking operation will wait a roundtrip to the mirror
    acting like a sync replica.
---
 .../protocol/amqp/broker/AMQPLargeMessage.java     |   5 +-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |   8 +-
 .../mirror/AMQPMirrorControllerAggregation.java    |  12 +-
 .../connect/mirror/AMQPMirrorControllerSource.java | 299 +++++++++++--
 .../connect/mirror/AMQPMirrorControllerTarget.java |  13 +-
 .../amqp/connect/mirror/ReferenceNodeStore.java    |  15 +-
 .../amqp/proton/ProtonServerSenderContext.java     |   7 +-
 .../core/protocol/openwire/amq/AMQConsumer.java    |   4 +-
 .../AMQPMirrorBrokerConnectionElement.java         |  10 +
 .../deployers/impl/FileConfigurationParser.java    |   3 +-
 .../core/paging/cursor/PagedReferenceImpl.java     |  16 +-
 .../impl/journal/OperationContextImpl.java         |   8 +-
 .../artemis/core/postoffice/PostOffice.java        |   4 +
 .../core/postoffice/impl/PostOfficeImpl.java       |  24 +-
 .../artemis/core/server/MessageReference.java      |   4 +-
 .../server/impl/AbstractProtocolReference.java     |  49 +++
 .../server/impl/GroupFirstMessageReference.java    |   8 +-
 .../core/server/impl/MessageReferenceImpl.java     |  13 +-
 .../artemis/core/server/impl/QueueImpl.java        |   4 +-
 .../core/server/mirror/MirrorController.java       |   7 +-
 .../transaction/TransactionPropertyIndexes.java    |   4 +
 .../core/transaction/impl/TransactionImpl.java     |  29 +-
 .../resources/schema/artemis-configuration.xsd     |   8 +
 .../core/config/impl/ConfigurationImplTest.java    |   9 +
 .../resources/ConfigurationTest-full-config.xml    |   2 +-
 docs/user-manual/en/amqp-broker-connections.md     |   8 +-
 .../amqp/AmqpReferenceDeliveryAnnotationTest.java  |   2 +-
 .../integration/amqp/connect/AMQPReplicaTest.java  |   5 +-
 .../amqp/connect/AMQPSyncMirrorTest.java           | 474 +++++++++++++++++++++
 .../config/impl/ConfigurationValidationTest.java   |   2 +
 30 files changed, 934 insertions(+), 122 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index ec86311338..e9987ed9ff 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -44,6 +44,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
@@ -148,8 +149,8 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
     * It was written to check the deliveryAnnotationsForSendBuffer and 
eventually move it to the protocolData.
     */
    public void checkReference(MessageReference reference) {
-      if (reference.getProtocolData() == null && 
deliveryAnnotationsForSendBuffer != null) {
-         reference.setProtocolData(deliveryAnnotationsForSendBuffer);
+      if (reference.getProtocolData(DeliveryAnnotations.class) == null && 
deliveryAnnotationsForSendBuffer != null) {
+         reference.setProtocolData(DeliveryAnnotations.class, 
deliveryAnnotationsForSendBuffer);
       }
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index df3bdeda73..84e319cd2c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -754,12 +754,10 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
    public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference 
reference) {
       ensureDataIsValid();
 
-      DeliveryAnnotations daToWrite;
+      DeliveryAnnotations daToWrite = reference != null ? 
reference.getProtocolData(DeliveryAnnotations.class) : null;
 
-      if (reference != null && reference.getProtocolData() instanceof 
DeliveryAnnotations) {
-         daToWrite = (DeliveryAnnotations) reference.getProtocolData();
-      } else {
-         // deliveryAnnotationsForSendBuffer was an old API form where a 
deliver could set it before deliver
+      if (reference == null) {
+         // deliveryAnnotationsForSendBuffer is part of an older API, 
deprecated but still present
          daToWrite = deliveryAnnotationsForSendBuffer;
       }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
index cc5fb7eafb..007f9a0d83 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 
 /** this will be used when there are multiple replicas in use. */
 public class AMQPMirrorControllerAggregation implements MirrorController, 
ActiveMQComponent {
@@ -72,6 +73,13 @@ public class AMQPMirrorControllerAggregation implements 
MirrorController, Active
       return partitions;
    }
 
+   @Override
+   public void preAcknowledge(Transaction tx, MessageReference ref, AckReason 
reason) throws Exception {
+      for (MirrorController partition : partitions) {
+         partition.preAcknowledge(tx, ref, reason);
+      }
+   }
+
    @Override
    public void addAddress(AddressInfo addressInfo) throws Exception {
       for (MirrorController partition : partitions) {
@@ -102,9 +110,9 @@ public class AMQPMirrorControllerAggregation implements 
MirrorController, Active
    }
 
    @Override
-   public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+   public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
       for (MirrorController partition : partitions) {
-         partition.sendMessage(message, context, refs);
+         partition.sendMessage(tx, message, context);
       }
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 188742af2c..d9052c4623 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,9 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -34,6 +38,9 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@@ -86,6 +93,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    final boolean deleteQueues;
    final MirrorAddressFilter addressFilter;
    private final AMQPBrokerConnection brokerConnection;
+   private final boolean sync;
 
    final AMQPMirrorBrokerConnectionElement replicaConfig;
 
@@ -116,6 +124,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       this.addressFilter = new 
MirrorAddressFilter(replicaConfig.getAddressFilter());
       this.acks = replicaConfig.isMessageAcknowledgements();
       this.brokerConnection = brokerConnection;
+      this.sync = replicaConfig.isSync();
    }
 
    public Queue getSnfQueue() {
@@ -216,60 +225,120 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    }
 
    @Override
-   public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+   public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
       SimpleString address = context.getAddress(message);
 
       if (invalidTarget(context.getMirrorSource())) {
-         logger.trace("server {} is discarding send to avoid infinite loop 
(reflection with the mirror)", server);
+         logger.trace("sendMessage::server {} is discarding send to avoid 
infinite loop (reflection with the mirror)", server);
          return;
       }
 
       if (context.isInternal()) {
-         logger.trace("server {} is discarding send to avoid sending to 
internal queue", server);
+         logger.trace("sendMessage::server {} is discarding send to avoid 
sending to internal queue", server);
          return;
       }
 
       if (ignoreAddress(address)) {
-         logger.trace("server {} is discarding send to address {}, address 
doesn't match filter", server, address);
+         logger.trace("sendMessage::server {} is discarding send to address 
{}, address doesn't match filter", server, address);
          return;
       }
 
-      logger.trace("{} send message {}", server, message);
+      logger.trace("sendMessage::{} send message {}", server, message);
 
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
-            logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
+            logger.trace("sendMessage::Message {} already belonged to the 
node, {}, it won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
-         refs.add(ref);
+
+         if (tx != null) {
+            logger.debug("sendMessage::Mirroring Message {} with TX", message);
+            getSendOperation(tx).addRef(ref);
+         } // if non transactional the afterStoreOperations will use the ref 
directly and call processReferences
+
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {
+               logger.debug("sendMessage::mirror syncUp context={}, ref={}", 
operContext, ref);
+            }
+            ref.setProtocolData(OperationContext.class, operContext);
+         }
 
          if (message.isDurable() && snfQueue.isDurable()) {
             PostOfficeImpl.storeDurableReference(server.getStorageManager(), 
message, context.getTransaction(), snfQueue, true);
          }
 
+         if (tx == null) {
+            server.getStorageManager().afterStoreOperations(new IOCallback() {
+               @Override
+               public void done() {
+                  PostOfficeImpl.processReference(ref, false);
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+               }
+            });
+         }
       } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
       }
    }
 
+   private void syncDone(MessageReference reference) {
+      OperationContext ctx = reference.getProtocolData(OperationContext.class);
+      if (ctx != null) {
+         ctx.replicationDone();
+         logger.debug("syncDone::replicationDone::ctx={},ref={}", ctx, 
reference);
+      }  else {
+         Message message = reference.getMessage();
+         if (message != null) {
+            ctx = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (ctx != null) {
+               ctx.replicationDone();
+               logger.debug("syncDone::replicationDone message={}", message);
+            } else {
+               logger.trace("syncDone::No operationContext set on message {}", 
message);
+            }
+         } else {
+            logger.debug("syncDone::no message set on reference {}", 
reference);
+         }
+      }
+   }
+
    public static void validateProtocolData(ReferenceNodeStore 
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
-      if (ref.getProtocolData() == null && 
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
+      if (ref.getProtocolData(DeliveryAnnotations.class) == null && 
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
          setProtocolData(referenceIDSupplier, ref);
       }
    }
 
    /** This method will return the brokerID used by the message */
    private static String setProtocolData(ReferenceNodeStore 
referenceIDSupplier, MessageReference ref) {
+      String brokerID = referenceIDSupplier.getServerID(ref);
+      long id = referenceIDSupplier.getID(ref);
+
+      setProtocolData(ref, brokerID, id);
+
+      return brokerID;
+   }
+
+   private static void setProtocolData(MessageReference ref, String brokerID, 
long id) {
       Map<Symbol, Object> daMap = new HashMap<>();
       DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
 
-      String brokerID = referenceIDSupplier.getServerID(ref);
-
       // getListID will return null when the message was generated on this 
broker.
       // on this case we do not send the brokerID, and the ControllerTarget 
will get the information from the link.
       // this is just to safe a few bytes and some processing on the wire.
@@ -278,8 +347,6 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          daMap.put(BROKER_ID, brokerID);
       }
 
-      long id = referenceIDSupplier.getID(ref);
-
       daMap.put(INTERNAL_ID, id);
       String address = ref.getMessage().getAddress();
       if (address != null) { // this is the message that was set through 
routing
@@ -290,9 +357,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
-      ref.setProtocolData(deliveryAnnotations);
-
-      return brokerID;
+      ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations);
    }
 
    private static Properties getProperties(Message message) {
@@ -303,12 +368,30 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       }
    }
 
+   private void postACKInternalMessage(MessageReference reference) {
+      logger.debug("postACKInternalMessage::server={}, ref={}", server, 
reference);
+      if (sync) {
+         syncDone(reference);
+      }
+   }
+
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+      if (!acks || ref.getQueue().isMirrorController()) {
+         postACKInternalMessage(ref);
+         return;
+      }
+   }
+
+   @Override
+   public void preAcknowledge(final Transaction tx, final MessageReference 
ref, final AckReason reason) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("postACKInternalMessage::tx={}, ref={}, reason={}", tx, 
ref, reason);
+      }
 
       MirrorController controllerInUse = getControllerInUse();
 
-      if (!acks || ref.getQueue().isMirrorController()) { // we don't call 
postACK on snfqueues, otherwise we would get infinite loop because of this 
feedback/
+      if (!acks || ref.getQueue().isMirrorController()) { // we don't call 
preAcknowledge on snfqueues, otherwise we would get infinite loop because of 
this feedback/
          return;
       }
 
@@ -318,28 +401,192 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      // This map contains the Message used to generate the command towards 
the target, the reference being acked
+      final HashMap<Message, MessageReference> acks = new HashMap<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         acks.put(message, ref);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
+         acks.forEach(this::doBeforeCommit);
+      }
+
+      // callback to be used on forEach
+      private void doBeforeCommit(Message ack, MessageReference ref) {
+         OperationContext context = (OperationContext) 
ack.getUserContext(OperationContext.class);
+         if (context != null) {
+            context.replicationLineUp();
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         logger.debug("MirrorACKOperation::afterCommit processing {}", acks);
+         acks.forEach(this::doAfterCommit);
+      }
+
+      // callback to be used on forEach
+      private void doAfterCommit(Message ack, MessageReference ref) {
+         try {
+            route(server, ack);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+         ref.getMessage().usageDown();
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         acks.forEach(this::doAfterRollback);
+      }
+
+      // callback to be used on forEach
+      private void doAfterRollback(Message ack, MessageReference ref) {
+         OperationContext context = (OperationContext) 
ack.getUserContext(OperationContext.class);
+         if (context != null) {
+            context.replicationDone();
+         }
+      }
+
+   }
+
+   private static final class MirrorSendOperation extends 
TransactionOperationAbstract {
+      final List<MessageReference> refs = new ArrayList<>();
+
+      public void addRef(MessageReference ref) {
+         refs.add(ref);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         refs.forEach(this::doBeforeCommit);
+      }
+
+      // callback to be used on forEach
+      private void doBeforeCommit(MessageReference ref) {
+         OperationContext context = 
ref.getProtocolData(OperationContext.class);
+         if (context != null) {
+            context.replicationLineUp();
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         logger.debug("MirrorSendOperation::afterRollback, refs:{}", refs);
+         refs.forEach(this::doBeforeRollback);
+      }
+
+      // forEach callback
+      private void doBeforeRollback(MessageReference ref) {
+         OperationContext localCTX = 
ref.getProtocolData(OperationContext.class);
+         if (localCTX != null) {
+            localCTX.replicationDone();
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         logger.debug("MirrorSendOperation::afterCommit refs:{}", refs);
+         refs.forEach(this::doAfterCommit);
+      }
+
+      // forEach callback
+      private void doAfterCommit(MessageReference ref) {
+         PostOfficeImpl.processReference(ref, false);
       }
-      Message message = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
-      route(server, message);
-      ref.getMessage().usageDown();
    }
 
    private Message createMessage(SimpleString address, SimpleString queue, 
Object event, String brokerID, Object body) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 74edd0a42e..1140f5c8b4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
 
-import java.util.List;
 import java.util.function.BooleanSupplier;
 import java.util.function.ToIntFunction;
 
@@ -279,6 +278,11 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
       return AddressInfo.fromJSON(body);
    }
 
+   @Override
+   public void preAcknowledge(Transaction tx, MessageReference ref, AckReason 
reason) throws Exception {
+      // NO-OP
+   }
+
    @Override
    public void addAddress(AddressInfo addressInfo) throws Exception {
       logger.debug("{} adding address {}", server, addressInfo);
@@ -359,10 +363,11 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    }
 
    private void performAck(String nodeID, long messageID, Queue targetQueue, 
ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
+      MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceNodeStore);
+
       if (logger.isTraceEnabled()) {
-         logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}", 
nodeID, messageID, targetQueue.getName());
+         logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). 
Ref={}", nodeID, messageID, targetQueue.getName(), reference);
       }
-      MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceNodeStore);
 
       if (reference == null) {
          if (logger.isDebugEnabled()) {
@@ -490,7 +495,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    }
 
    @Override
-   public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+   public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
       // Do nothing
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
index b0a8605fa4..d82560cace 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.protocol.amqp.connect.mirror;
 import java.util.HashMap;
 
 import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.utils.collections.NodeStore;
@@ -112,7 +113,12 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
    }
 
    public String getServerID(MessageReference element) {
-      Object nodeID = 
element.getMessage().getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
+      return getServerID(element.getMessage());
+   }
+
+
+   public String getServerID(Message message) {
+      Object nodeID = 
message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
       if (nodeID != null) {
          return nodeID.toString();
       } else {
@@ -124,7 +130,8 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
    }
 
    public long getID(MessageReference element) {
-      Long id = (Long) 
element.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+      Message message = element.getMessage();
+      Long id = getID(message);
       if (id == null) {
          return element.getMessageID();
       } else {
@@ -132,6 +139,10 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
       }
    }
 
+   private Long getID(Message message) {
+      return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+   }
+
    @Override
    public synchronized void clear() {
       lists.forEach((k, v) -> v.clear());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 2d85abaf84..98d62c46cb 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -683,14 +683,9 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
          frameBuffer.clear();
 
-         DeliveryAnnotations deliveryAnnotationsToEncode;
          message.checkReference(reference);
 
-         if (reference.getProtocolData() != null && 
reference.getProtocolData() instanceof DeliveryAnnotations) {
-            deliveryAnnotationsToEncode = 
(DeliveryAnnotations)reference.getProtocolData();
-         } else {
-            deliveryAnnotationsToEncode = null;
-         }
+         DeliveryAnnotations deliveryAnnotationsToEncode = 
reference.getProtocolData(DeliveryAnnotations.class);
 
          try {
             replaceInitialHeader(deliveryAnnotationsToEncode, context, new 
NettyWritable(frameBuffer));
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index d064d5036c..2db8d835d0 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -295,7 +295,7 @@ public class AMQConsumer {
          //handleDeliver is performed by an executor (see JBPAPP-6030): any 
AMQConsumer can share the session.wireFormat()
          dispatch = OpenWireMessageConverter.createMessageDispatch(reference, 
message, session.wireFormat(), this, 
session.getCoreServer().getNodeManager().getUUID());
          int size = dispatch.getMessage().getSize();
-         reference.setProtocolData(dispatch.getMessage().getMessageId());
+         reference.setProtocolData(MessageId.class, 
dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
          currentWindow.decrementAndGet();
          return size;
@@ -337,7 +337,7 @@ public class AMQConsumer {
 
       // if it's browse only, nothing to be acked
       final boolean removeReferences = !serverConsumer.isBrowseOnly() && 
!serverConsumer.getQueue().isNonDestructive();
-      final List<MessageReference> ackList = 
serverConsumer.scanDeliveringReferences(removeReferences, reference -> 
startID.equals(reference.getProtocolData()), reference -> 
lastID.equals(reference.getProtocolData()));
+      final List<MessageReference> ackList = 
serverConsumer.scanDeliveringReferences(removeReferences, reference -> 
startID.equals(reference.getProtocolData(MessageId.class)), reference -> 
lastID.equals(reference.getProtocolData(MessageId.class)));
 
       if (!ackList.isEmpty() || !removeReferences || 
serverConsumer.getQueue().isTemporary()) {
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
index 227b669fc7..f71ee913a1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
@@ -28,6 +28,8 @@ public class AMQPMirrorBrokerConnectionElement extends 
AMQPBrokerConnectionEleme
 
    boolean messageAcknowledgements = true;
 
+   boolean sync = false;
+
    SimpleString mirrorSNF;
 
    String addressFilter;
@@ -98,4 +100,12 @@ public class AMQPMirrorBrokerConnectionElement extends 
AMQPBrokerConnectionEleme
       return this;
    }
 
+   public boolean isSync() {
+      return sync;
+   }
+
+   public AMQPMirrorBrokerConnectionElement setSync(boolean sync) {
+      this.sync = sync;
+      return this;
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 669d76d94b..3bdb0ca706 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2121,10 +2121,11 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
                boolean queueCreation = 
getBooleanAttribute(e2,"queue-creation", true);
                boolean durable = getBooleanAttribute(e2, "durable", true);
                boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", 
true);
+               boolean sync = getBooleanAttribute(e2, "sync", false);
                String addressFilter = getAttributeValue(e2, "address-filter");
 
                AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = 
new AMQPMirrorBrokerConnectionElement();
-               
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter);
+               
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
                connectionElement = amqpMirrorConnectionElement;
                
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
             } else {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 55c38c820a..0abc464360 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -26,14 +26,14 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.AbstractProtocolReference;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
 
-public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference, Runnable {
+public class PagedReferenceImpl extends AbstractProtocolReference implements 
PagedReference, Runnable {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -75,8 +75,6 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
 
    private boolean alreadyAcked;
 
-   private Object protocolData;
-
    //0 is false, 1 is true, 2 not defined
    private static final byte IS_NOT_LARGE_MESSAGE = 0;
    private static final byte IS_LARGE_MESSAGE = 1;
@@ -97,16 +95,6 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
    private static final byte UNDEFINED_IS_DURABLE = -1;
    private byte durable = UNDEFINED_IS_DURABLE;
 
-   @Override
-   public Object getProtocolData() {
-      return protocolData;
-   }
-
-   @Override
-   public void setProtocolData(Object protocolData) {
-      this.protocolData = protocolData;
-   }
-
    @Override
    public Message getMessage() {
       return getPagedMessage().getMessage();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index 91681ff078..ceb402809c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -177,8 +177,12 @@ public class OperationContextImpl implements 
OperationContext {
                }
             } else {
                if (storeOnly) {
-                  assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
-                  storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
+                  if (storeLined == stored && 
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
+                     executeNow = true;
+                  } else {
+                     assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
+                     storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
+                  }
                } else {
                   // ensure total ordering
                   assert validateTasksAdd(storeLined, replicationLined, 
pageLined);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 8f82a172ea..4c368be6b9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -224,4 +224,8 @@ public interface PostOffice extends ActiveMQComponent {
    default AddressManager getAddressManager() {
       return null;
    }
+
+   default void preAcknowledge(final Transaction tx, final MessageReference 
ref, AckReason reason) {
+   }
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b9c60f1fb3..3950a4dcaa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -246,6 +246,18 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       return this;
    }
 
+   @Override
+   public void preAcknowledge(final Transaction tx, final MessageReference 
ref, AckReason reason) {
+      if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // 
we don't send replacements on LVQ as they are replaced themselves on the target
+         try {
+            mirrorControllerSource.preAcknowledge(tx, ref, reason);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+   }
+
+
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) {
       if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // 
we don't send replacements on LVQ as they are replaced themselves on the target
@@ -1624,7 +1636,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
       if (mirrorControllerSource != null && !context.isMirrorDisabled()) {
          // we check for isMirrorDisabled as to avoid recursive loop from there
-         mirrorControllerSource.sendMessage(message, context, refs);
+         mirrorControllerSource.sendMessage(tx, message, context);
       }
 
 
@@ -1647,10 +1659,12 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       }
    }
 
-   private static void processReferences(List<MessageReference> refs, boolean 
direct) {
-      for (MessageReference ref : refs) {
-         ref.getQueue().addTail(ref, direct);
-      }
+   public static void processReferences(List<MessageReference> refs, boolean 
direct) {
+      refs.forEach((ref) -> processReference(ref, direct));
+   }
+
+   public static void processReference(MessageReference ref, boolean direct) {
+      ref.getQueue().addTail(ref, direct);
    }
 
    private void processRouteToDurableQueues(final Message message,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index daf4f90db1..dfb0c17b50 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -74,13 +74,13 @@ public interface MessageReference {
     * To be used on holding protocol specific data during the delivery.
     * This will be only valid while the message is on the delivering queue at 
the consumer
     */
-   Object getProtocolData();
+   <T> T getProtocolData(Class<T> typeClass);
 
    /**
     * To be used on holding protocol specific data during the delivery.
     * This will be only valid while the message is on the delivering queue at 
the consumer
     */
-   void setProtocolData(Object data);
+   <T> void setProtocolData(Class<T> typeClass, T data);
 
    MessageReference copy(Queue queue);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java
new file mode 100644
index 0000000000..35f3a07548
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+
+/** I need to store protocol specific data on the references. The same need 
exists in both PagedReference and MessageReferenceImpl.
+ *  This class will serve the purpose to keep the specific protocol data for 
either reference.
+ *  */
+public abstract class AbstractProtocolReference extends 
LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
+
+   private HashMap<Class, Object> protocolDataMap;
+
+   @Override
+   public <T> T getProtocolData(Class<T> classType) {
+      if (protocolDataMap == null) {
+         return null;
+      } else {
+         return (T)protocolDataMap.get(classType);
+      }
+   }
+
+   @Override
+   public <T> void setProtocolData(Class<T> classType, T protocolData) {
+      if (protocolDataMap == null) {
+         protocolDataMap = new HashMap<>();
+      }
+      protocolDataMap.put(classType, protocolData);
+   }
+
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index c0b0e02747..10163bdacd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -89,13 +89,13 @@ public class GroupFirstMessageReference implements 
MessageReference {
    }
 
    @Override
-   public Object getProtocolData() {
-      return messageReference.getProtocolData();
+   public <T> T getProtocolData(Class<T> typeClass) {
+      return messageReference.getProtocolData(typeClass);
    }
 
    @Override
-   public void setProtocolData(Object data) {
-      messageReference.setProtocolData(data);
+   public <T> void setProtocolData(Class<T> typeClass, T data) {
+      messageReference.setProtocolData(typeClass, data);
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 262689aa53..6d39c40372 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -27,12 +27,11 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 
 /**
  * Implementation of a MessageReference
  */
-public class MessageReferenceImpl extends 
LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable 
{
+public class MessageReferenceImpl extends AbstractProtocolReference implements 
MessageReference, Runnable {
 
    private static final MessageReferenceComparatorByID idComparator = new 
MessageReferenceComparatorByID();
 
@@ -78,8 +77,6 @@ public class MessageReferenceImpl extends 
LinkedListImpl.Node<MessageReferenceIm
 
    private boolean deliveredDirectly;
 
-   private Object protocolData;
-
    private Consumer<? super MessageReference> onDelivery;
 
 
@@ -138,15 +135,7 @@ public class MessageReferenceImpl extends 
LinkedListImpl.Node<MessageReferenceIm
       }
    }
 
-   @Override
-   public Object getProtocolData() {
-      return protocolData;
-   }
 
-   @Override
-   public void setProtocolData(Object protocolData) {
-      this.protocolData = protocolData;
-   }
 
    /**
     * @return the persistedCount
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index bbc90f56c9..e664cbc998 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1877,9 +1877,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
 
       if (logger.isTraceEnabled()) {
-         logger.trace("{} acknowledge tx={} ref={}, reason={}, consumer={}", 
this, transactional, ref, reason, consumer);
+         logger.trace("queue.acknowledge serverIdentity={}, queue={} 
acknowledge tx={} ref={}, reason={}, consumer={}", server.getIdentity(), 
this.getName(), transactional, ref, reason, consumer);
       }
 
+      postOffice.preAcknowledge(tx, ref, reason);
+
       if (nonDestructive && reason == AckReason.NORMAL) {
          if (transactional) {
             refsOperation.addOnlyRefAck(ref);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
index 5532d0bf9c..9ecf8d1b17 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server.mirror;
 
-import java.util.List;
-
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -25,7 +23,7 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-
+import org.apache.activemq.artemis.core.transaction.Transaction;
 
 /**
  * This represents the contract we will use to send messages to replicas.
@@ -35,9 +33,10 @@ public interface MirrorController {
    void deleteAddress(AddressInfo addressInfo) throws Exception;
    void createQueue(QueueConfiguration queueConfiguration) throws Exception;
    void deleteQueue(SimpleString addressName, SimpleString queueName) throws 
Exception;
-   void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs);
+   void sendMessage(Transaction tx, Message message, RoutingContext context);
 
    void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception;
+   void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) 
throws Exception;
 
    String getRemoteMirrorId();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
index d6c332a6b5..813ef5a098 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
@@ -37,4 +37,8 @@ public class TransactionPropertyIndexes {
    public static final int EXPIRY_LOGGER = 9;
 
    public static final int CONSUMER_METRICS_OPERATION = 10;
+
+   public static final int MIRROR_ACK_OPERATION = 11;
+
+   public static final int MIRROR_SEND_OPERATION = 12;
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 13ee0ba73e..536dbf1b42 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.transaction.impl;
 
 import javax.transaction.xa.Xid;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 
+import io.netty.util.collection.IntObjectHashMap;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -50,7 +50,7 @@ public class TransactionImpl implements Transaction {
 
    private static final int INITIAL_NUM_PROPERTIES = 11;
 
-   private Object[] properties = null;
+   private IntObjectHashMap properties = null;
 
    protected final StorageManager storageManager;
 
@@ -72,22 +72,6 @@ public class TransactionImpl implements Transaction {
 
    private Object protocolData;
 
-   private void ensurePropertiesCapacity(int capacity) {
-      if (properties != null && properties.length >= capacity) {
-         return;
-      }
-      createOrEnlargeProperties(capacity);
-   }
-
-   private void createOrEnlargeProperties(int capacity) {
-      if (properties == null) {
-         properties = new 
Object[Math.min(TransactionImpl.INITIAL_NUM_PROPERTIES, capacity)];
-      } else {
-         assert properties.length < capacity;
-         properties = Arrays.copyOf(properties, capacity);
-      }
-   }
-
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -529,14 +513,17 @@ public class TransactionImpl implements Transaction {
 
    @Override
    public void putProperty(final int index, final Object property) {
-      ensurePropertiesCapacity(index + 1);
 
-      properties[index] = property;
+      if (properties == null) {
+         properties = new IntObjectHashMap();
+      }
+
+      properties.put(index, property);
    }
 
    @Override
    public Object getProperty(final int index) {
-      return properties == null ? null : (index < properties.length ? 
properties[index] : null);
+      return properties == null ? null : properties.get(index);
    }
 
    // Private
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index cc8c2cd538..9154d510fd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2447,6 +2447,14 @@
             </xsd:documentation>
          </xsd:annotation>
       </xsd:attribute>
+      <xsd:attribute name="sync" type="xsd:boolean" use="optional" 
default="false">
+         <xsd:annotation>
+            <xsd:documentation>
+               If this is true, client blocking operations will be waiting a 
response from the mirror before the unblocking the operation.
+               This is false by default.
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
       <xsd:attribute name="address-filter" type="xsd:string" use="optional">
          <xsd:annotation>
             <xsd:documentation>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index aa5088452e..581cc78ea8 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -709,6 +709,11 @@ public class ConfigurationImplTest extends 
ActiveMQTestBase {
 
    @Test
    public void testAMQPConnectionsConfiguration() throws Throwable {
+      testAMQPConnectionsConfiguration(true);
+      testAMQPConnectionsConfiguration(false);
+   }
+
+   private void testAMQPConnectionsConfiguration(boolean sync) throws 
Throwable {
       ConfigurationImpl configuration = new ConfigurationImpl();
 
       Properties insertionOrderedProperties = new 
ConfigurationImpl.InsertionOrderedProperties();
@@ -723,6 +728,9 @@ public class ConfigurationImplTest extends ActiveMQTestBase 
{
       
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueCreation",
 "true");
       
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueRemoval",
 "true");
       
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.addressFilter",
 "foo");
+      if (sync) {
+         
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.sync",
 "true");
+      } // else we just use the default that is false
 
       configuration.parsePrefixedProperties(insertionOrderedProperties, null);
 
@@ -742,6 +750,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase 
{
       Assert.assertEquals(true, 
amqpMirrorBrokerConnectionElement.isMessageAcknowledgements());
       Assert.assertEquals(true, 
amqpMirrorBrokerConnectionElement.isQueueCreation());
       Assert.assertEquals(true, 
amqpMirrorBrokerConnectionElement.isQueueRemoval());
+      Assert.assertEquals(sync, ((AMQPMirrorBrokerConnectionElement) 
amqpBrokerConnectionElement).isSync());
       Assert.assertEquals("foo", 
amqpMirrorBrokerConnectionElement.getAddressFilter());
    }
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 8fec6467f4..8adcc93cf2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -442,7 +442,7 @@
             <receiver address-match="TEST-RECEIVER" />
             <peer address-match="TEST-PEER"/>
             <receiver queue-name="TEST-WITH-QUEUE-NAME"/>
-            <mirror message-acknowledgements="false" queue-creation="false" 
durable="false" queue-removal="false" 
address-filter="TEST-QUEUE,!IGNORE-QUEUE"/>
+            <mirror message-acknowledgements="false" queue-creation="false" 
durable="false" queue-removal="false" address-filter="TEST-QUEUE,!IGNORE-QUEUE" 
sync="true"/>
          </amqp-connection>
          <amqp-connection uri="tcp://test2:222" name="test2">
             <mirror durable="false"/>
diff --git a/docs/user-manual/en/amqp-broker-connections.md 
b/docs/user-manual/en/amqp-broker-connections.md
index 667ab8ceff..68784e7eee 100644
--- a/docs/user-manual/en/amqp-broker-connections.md
+++ b/docs/user-manual/en/amqp-broker-connections.md
@@ -81,8 +81,7 @@ The previous example portrays a case of connection failure 
towards ServerA. The
 <div style="page-break-after: always"></div>
 
 ## Mirroring
-The idea of mirroring is to send events that happen on a broker towards 
another broker, without blocking any operations from producers and consumers, 
allowing them to keep operating as fast as possible.
-It can be used for Disaster Recovery, and depending on the requirements even 
for failing over the data.
+Mirroring will reproduce any operation that happened on the source brokers 
towards a target broker.
 
 The following events are sent through mirroring:
 
@@ -94,6 +93,8 @@ The following events are sent through mirroring:
 * Queue and address creation.
 * Queue and address deletion.
 
+By default every operation is sent asynchronously without blocking any 
clients. However if you set sync="true" on the mirror configuration, the 
clients will always wait a mirror on every blocking operation.
+
 ### Mirror configuration
 
 Add a `<mirror>` element within the `<amqp-connection>` element to configure 
mirroring to the target broker.
@@ -119,9 +120,10 @@ The following optional arguments can be utilized:
     matches all addresses starting with 'eu' but not those starting with 
'eu.uk'
 
   **Note:**
-
   - Address exclusion will always take precedence over address inclusion.
   - Address matching on mirror elements is prefix-based and does not support 
wild-card matching.
+* `sync`: By default is false. If set it to true any client blocking operation 
will be held until the mirror as confirmed receiving the operation.
+  * Notice that a disconnected node would hold all operations from the client. 
If you set sync=true you must reconnect a mirror before performing any 
operations.
 
 An example of a mirror configuration is shown below:
 ```xml
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
index 24ddf08d71..7c88310fbf 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
@@ -77,7 +77,7 @@ public class AmqpReferenceDeliveryAnnotationTest extends 
AmqpClientTestSupport {
             Map<Symbol, Object> symbolObjectMap = new HashMap<>();
             DeliveryAnnotations deliveryAnnotations = new 
DeliveryAnnotations(symbolObjectMap);
             symbolObjectMap.put(Symbol.getSymbol("KEY"), uuid);
-            reference.setProtocolData(deliveryAnnotations);
+            reference.setProtocolData(DeliveryAnnotations.class, 
deliveryAnnotations);
          }
       });
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index c717bdaea5..841c8ec1a6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -681,9 +681,11 @@ public class AMQPReplicaTest extends AmqpClientTestSupport 
{
       server_2.getConfiguration().setName("thisone");
 
       AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
-      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks).setDurable(true);
+      replica.setName("theReplica");
       amqpConnection.addElement(replica);
       server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      server_2.getConfiguration().setName("server_2");
 
       int NUMBER_OF_MESSAGES = 200;
 
@@ -698,7 +700,6 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
       Connection connection = factory.createConnection();
       Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
       MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
       if (!deferredStart) {
          Queue queueOnServer1 = locateQueue(server, getQueueName());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
new file mode 100644
index 0000000000..5b399cfe33
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.debug("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.debug("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.debug("slow acquired ACK semaphore");
+                  } else {
+                     logger.debug("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.debug("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.debug("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            logger.debug("===>>> send message {}", i);
+            int theI = i;
+            sendPending.countUp();
+            logger.debug("semSend.acquire");
+            semSend.acquire();
+            if (!transactional) {
+               pool.execute(() -> {
+                  try {
+                     logger.debug("Entering non TX send with sendPending = 
{}", sendPending.getCount());
+                     TextMessage message = 
session.createTextMessage(bodyMessage);
+                     message.setStringProperty("strProperty", "" + theI);
+                     producer.send(message);
+                     sendPending.countDown();
+                     logger.debug("leaving non TX send with sendPending = {}", 
sendPending.getCount());
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                     errors.incrementAndGet();
+                  }
+               });
+            } else {
+               CountDownLatch sendDone = new CountDownLatch(1);
+               pool.execute(() -> {
+                  try {
+                     TextMessage message = 
session.createTextMessage(bodyMessage);
+                     message.setStringProperty("strProperty", "" + theI);
+                     producer.send(message);
+                  } catch (Throwable e) {
+                     errors.incrementAndGet();
+                     logger.warn(e.getMessage(), e);
+                  }
+                  sendDone.countDown();
+               });
+
+               Wait.assertEquals(i, replicatedQueue::getMessageCount);
+
+               Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+               pool.execute(() -> {
+                  try {
+                     session.commit();
+                     sendPending.countDown();
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+               });
+            }
+
+            Assert.assertFalse("sendPending.await() not supposed to succeed", 
sendPending.await(10, TimeUnit.MILLISECONDS));
+            logger.debug("semSend.release");
+            semSend.release();
+            Assert.assertTrue(sendPending.await(10, TimeUnit.SECONDS));
+            Wait.assertEquals(i + 1, replicatedQueue::getMessageCount);
+         }
+
+         if (!transactional) {
+            Wait.assertEquals(NUMBER_OF_MESSAGES, countStored::get);
+         }
+         Wait.assertEquals(NUMBER_OF_MESSAGES, 
replicatedQueue::getMessageCount);
+
+         connection.start();
+
+         Session clientSession = transactional ? 
connection.createSession(true, Session.AUTO_ACKNOWLEDGE) : 
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = 
clientSession.createConsumer(clientSession.createQueue(getQueueName()));
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            logger.debug("===<<< Receiving message {}", i);
+            Message message = consumer.receive(5000);
+            Assert.assertNotNull(message);
+            semAck.acquire();
+
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            pool.execute(() -> {
+               try {
+                  if (transactional) {
+                     clientSession.commit();
+                  } else {
+                     message.acknowledge();
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               } finally {
+                  countDownLatch.countDown();
+               }
+            });
+
+            if (!transactional && protocol.equals("AMQP")) {
+               // non transactional ack in AMQP is always async. No need to 
verify anything else here
+               logger.debug("non transactional and amqp is always 
asynchronous. No need to verify anything");
+            } else {
+               Assert.assertFalse(countDownLatch.await(10, 
TimeUnit.MILLISECONDS));
+            }
+
+            semAck.release();
+            Assert.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
+            Wait.assertEquals(NUMBER_OF_MESSAGES - i - 1, 
replicatedQueue::getMessageCount);
+         }
+
+         Assert.assertEquals(0, errors.get());
+      } finally {
+         semAck.release();
+         semSend.release();
+      }
+   }
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      ActiveMQServer server = createServerWithCallbackStorage(AMQP_PORT, 
"fastServer", (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+         if (logger.isDebugEnabled()) {
+            logger.debug("StorageCallback::fast isUpdate={}, isTX={}, txID={}, 
id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, record);
+         }
+      });
+      addServer(server);
+      return server;
+   }
+
+   private void configureMirrorTowardsSlow(ActiveMQServer source) {
+      AMQPBrokerConnectConfiguration connection = new 
AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + 
SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replication = new 
AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true);
+      connection.addElement(replication);
+
+      source.getConfiguration().addAMQPConnection(connection);
+   }
+
+   private ActiveMQServer createServerWithCallbackStorage(int port, String 
name, StorageCallback storageCallback) throws Exception {
+      ActiveMQSecurityManager securityManager = new 
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new 
SecurityConfiguration());
+      ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(port), 
mBeanServer, securityManager) {
+         @Override
+         protected StorageManager createStorageManager() {
+            return 
AMQPSyncMirrorTest.this.createCallbackStorageManager(getConfiguration(), 
getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, 
ioCriticalErrorListener, storageCallback);
+         }
+      };
+
+      server.getConfiguration().setName(name);
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer,
 port));
+      server.getConfiguration().setMessageExpiryScanPeriod(-1);
+
+      server.getConfiguration().setJMXManagementEnabled(true);
+
+      configureAddressPolicy(server);
+      configureBrokerSecurity(server);
+
+      addServer(server);
+
+      return server;
+   }
+
+   private interface StorageCallback {
+      void storage(boolean isUpdate,
+                   boolean isCommit,
+                   long txID,
+                   long id,
+                   byte recordType,
+                   Persister persister,
+                   Object record);
+   }
+
+   private StorageManager createCallbackStorageManager(Configuration 
configuration,
+                                               CriticalAnalyzer 
criticalAnalyzer,
+                                               ExecutorFactory executorFactory,
+                                               ScheduledExecutorService 
scheduledPool,
+                                               ExecutorFactory 
ioExecutorFactory,
+                                               IOCriticalErrorListener 
ioCriticalErrorListener,
+                                               StorageCallback 
storageCallback) {
+      return new JournalStorageManager(configuration, criticalAnalyzer, 
executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
+         @Override
+         protected Journal createMessageJournal(Configuration config,
+                                                IOCriticalErrorListener 
criticalErrorListener,
+                                                int fileSize) {
+            return new JournalImpl(ioExecutorFactory, fileSize, 
config.getJournalMinFiles(), config.getJournalPoolFiles(), 
config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), 
config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", 
journalFF.getMaxIO(), 0, criticalErrorListener, 
config.getJournalMaxAtticFiles()) {
+               @Override
+               public void appendAddRecordTransactional(long txID,
+                                                        long id,
+                                                        byte recordType,
+                                                        Persister persister,
+                                                        Object record) throws 
Exception {
+                  storageCallback.storage(false, false, txID, id, recordType, 
persister, record);
+                  super.appendAddRecordTransactional(txID, id, recordType, 
persister, record);
+               }
+
+               @Override
+               public void appendAddRecord(long id,
+                                           byte recordType,
+                                           Persister persister,
+                                           Object record,
+                                           boolean sync,
+                                           IOCompletion callback) throws 
Exception {
+                  storageCallback.storage(false, false,  -1, id, recordType, 
persister, record);
+                  super.appendAddRecord(id, recordType, persister, record, 
sync, callback);
+               }
+
+               @Override
+               public void appendUpdateRecord(long id,
+                                              byte recordType,
+                                              EncodingSupport record,
+                                              boolean sync) throws Exception {
+                  storageCallback.storage(true, false, -1, id, recordType, 
null, record);
+                  super.appendUpdateRecord(id, recordType, record, sync);
+               }
+
+               @Override
+               public void appendUpdateRecordTransactional(long txID,
+                                                           long id,
+                                                           byte recordType,
+                                                           EncodingSupport 
record) throws Exception {
+                  storageCallback.storage(true, false, txID, id, recordType, 
null, record);
+                  super.appendUpdateRecordTransactional(txID, id, recordType, 
record);
+               }
+
+               @Override
+               public void appendCommitRecord(long txID,
+                                              boolean sync,
+                                              IOCompletion callback,
+                                              boolean lineUpContext) throws 
Exception {
+                  storageCallback.storage(false, true, txID, txID, (byte)0, 
null, null);
+                  super.appendCommitRecord(txID, sync, callback, 
lineUpContext);
+               }
+
+               @Override
+               public void tryAppendUpdateRecord(long id,
+                                          byte recordType,
+                                          Persister persister,
+                                          Object record,
+                                          boolean sync,
+                                          boolean replaceableUpdate,
+                                          JournalUpdateCallback updateCallback,
+                                          IOCompletion callback) throws 
Exception {
+                     storageCallback.storage(true, false, -1, -1, recordType, 
persister, record);
+                     super.tryAppendUpdateRecord(id, recordType, persister, 
record, sync, replaceableUpdate, updateCallback, callback);
+               }
+            };
+         }
+      };
+   }
+}
\ No newline at end of file
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
index 0a690c2a78..8b332fdf02 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
@@ -94,6 +94,7 @@ public class ConfigurationValidationTest extends 
ActiveMQTestBase {
       Assert.assertFalse(mirrorConnectionElement.isQueueCreation());
       Assert.assertFalse(mirrorConnectionElement.isQueueRemoval());
       Assert.assertFalse(mirrorConnectionElement.isDurable());
+      Assert.assertTrue(mirrorConnectionElement.isSync());
 
 
       amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(1);
@@ -104,6 +105,7 @@ public class ConfigurationValidationTest extends 
ActiveMQTestBase {
       Assert.assertFalse(mirrorConnectionElement.isDurable());
       Assert.assertTrue(mirrorConnectionElement.isQueueCreation());
       Assert.assertTrue(mirrorConnectionElement.isQueueRemoval());
+      Assert.assertFalse(mirrorConnectionElement.isSync()); // checking the 
default
 
       amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(2);
       Assert.assertFalse(amqpBrokerConnectConfiguration.isAutostart());


Reply via email to