tabish121 commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1081606585


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();
+         }
+
          refs.add(ref);
 
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {

Review Comment:
   Gate check isn't needed here.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("{} rejecting preAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("Routing ack out of message {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();
+      final LinkedList<Message> messages = new LinkedList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         refs.add(ref);
+         messages.add(message);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::beforeCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {

Review Comment:
   Remove the is enabled gate checks as they are useless unless there's more 
than two args, but also fix the logger to use proper formatting to avoid the 
unnecessary toString



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
       }
    }
 
+   private void postACKInternalMessage(MessageReference reference) {
+      if (logger.isDebugEnabled()) {
+         logger.debug("PostAckInternal::server={}, ref={}", 
server.getIdentity(), reference);
+      }
+      if (sync) {
+         syncDone(reference);
+      }
+   }
+
    @Override
-   public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+   public void preAcknowledge(final Transaction tx, final MessageReference 
ref, final AckReason reason) throws Exception {
+      logger.trace("preAcknowledge tx={}, ref={}, reason={}", tx, ref, reason);

Review Comment:
   This one actually could use a gate check to avoid object[] allocation



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("{} rejecting preAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("Routing ack out of message {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();
+      final LinkedList<Message> messages = new LinkedList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         refs.add(ref);
+         messages.add(message);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::beforeCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::afterCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            try {
+               route(server, message);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         for (MessageReference ref : refs) {
+            ref.getMessage().usageDown();
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            context.replicationDone();
+         }
+      }
+
+   }
+
+   private static final class MirrorSendOperation extends 
TransactionOperationAbstract {
+      final List<MessageReference> mirroredRefs = new LinkedList<>();
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         for (MessageReference ref : mirroredRefs) {
+            OperationContext context = 
ref.getProtocolData(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         if (logger.isDebugEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("{} rejecting preAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("Routing ack out of message {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();
+      final LinkedList<Message> messages = new LinkedList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         refs.add(ref);
+         messages.add(message);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::beforeCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::afterCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            try {
+               route(server, message);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         for (MessageReference ref : refs) {
+            ref.getMessage().usageDown();
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            context.replicationDone();
+         }
+      }
+
+   }
+
+   private static final class MirrorSendOperation extends 
TransactionOperationAbstract {
+      final List<MessageReference> mirroredRefs = new LinkedList<>();
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         for (MessageReference ref : mirroredRefs) {
+            OperationContext context = 
ref.getProtocolData(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("rolling back context for {} times", 
mirroredRefs.size());
+         }
+         for (MessageReference mirroredRef : mirroredRefs) {
+            OperationContext localCTX = 
mirroredRef.getProtocolData(OperationContext.class);
+            if (localCTX != null) {
+               localCTX.replicationDone();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isTraceEnabled()) {

Review Comment:
   Ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -128,11 +133,17 @@ public void storeLineUp() {
    @Override
    public void replicationLineUp() {
       REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
+      if (logger.isTraceEnabled()) {

Review Comment:
   Unneeded gate check



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();
+         }
+
          refs.add(ref);
 
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {
+               logger.debug("mirror syncUp context={}, ref={}", operContext, 
ref);
+            }
+            ref.setProtocolData(OperationContext.class, operContext);
+         }
+
          if (message.isDurable() && snfQueue.isDurable()) {
             PostOfficeImpl.storeDurableReference(server.getStorageManager(), 
message, context.getTransaction(), snfQueue, true);
          }
 
+         if (tx == null) {
+            server.getStorageManager().afterStoreOperations(new IOCallback() {
+               @Override
+               public void done() {
+                  PostOfficeImpl.processReferences(refs, false);
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+               }
+            });
+         }
       } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
       }
    }
 
+   private void syncDone(MessageReference reference) {
+      OperationContext ctx = reference.getProtocolData(OperationContext.class);
+      if (ctx != null) {
+         ctx.replicationDone();
+         if (logger.isDebugEnabled()) {

Review Comment:
   Remove gate check.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();

Review Comment:
   Seems like this could be made a Collections.singletonList instead of a 
LinkedList given there is only ever going to be one entry from what I can see, 
just need to move refs.add up into the other if block



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -128,11 +133,17 @@ public void storeLineUp() {
    @Override
    public void replicationLineUp() {
       REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
+      if (logger.isTraceEnabled()) {
+         logger.trace("replicationLineUp:: {}", replicationLineUpField);
+      }
    }
 
    @Override
    public synchronized void replicationDone() {
       replicated++;
+      if (logger.isTraceEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
       }
    }
 
+   private void postACKInternalMessage(MessageReference reference) {
+      if (logger.isDebugEnabled()) {

Review Comment:
   Unnecessary gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -107,6 +115,7 @@ public boolean isStarted() {
    public AMQPMirrorControllerSource(ProtonProtocolManager 
protonProtocolManager, Queue snfQueue, ActiveMQServer server, 
AMQPMirrorBrokerConnectionElement replicaConfig,
                                      AMQPBrokerConnection brokerConnection) {
       super(server);
+      assert server != null;

Review Comment:
   Generally in a constructor I'd rather the contract be explicitly tested with 
Objects.requireNonNull(x) vs an assert 



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("===>>> send message {}", i);
+            }
+            int theI = i;
+            sendPending.countUp();
+            logger.trace("semSend.acquire");
+            semSend.acquire();
+            if (!transactional) {
+               pool.execute(() -> {
+                  try {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Entering non TX send with sendPending = 
" + sendPending.getCount());
+                     }
+                     TextMessage message = 
session.createTextMessage(bodyMessage);
+                     message.setStringProperty("strProperty", "" + theI);
+                     producer.send(message);
+                     sendPending.countDown();
+                     if (logger.isTraceEnabled()) {

Review Comment:
   Same as above.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("===>>> send message {}", i);
+            }
+            int theI = i;
+            sendPending.countUp();
+            logger.trace("semSend.acquire");
+            semSend.acquire();
+            if (!transactional) {
+               pool.execute(() -> {
+                  try {
+                     if (logger.isTraceEnabled()) {

Review Comment:
   Wouldn't need a gate check here if the trace used proper formatting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to