[ 
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840411&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840411
 ]

ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jan/23 18:30
            Start Date: 19/Jan/23 18:30
    Worklog Time Spent: 10m 
      Work Description: tabish121 commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1081606585


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();
+         }
+
          refs.add(ref);
 
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {

Review Comment:
   Gate check isn't needed here.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("{} rejecting preAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("Routing ack out of message {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();
+      final LinkedList<Message> messages = new LinkedList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         refs.add(ref);
+         messages.add(message);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::beforeCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {

Review Comment:
   Remove the is enabled gate checks as they are useless unless there's more 
than two args, but also fix the logger to use proper formatting to avoid the 
unnecessary toString



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
       }
    }
 
+   private void postACKInternalMessage(MessageReference reference) {
+      if (logger.isDebugEnabled()) {
+         logger.debug("PostAckInternal::server={}, ref={}", 
server.getIdentity(), reference);
+      }
+      if (sync) {
+         syncDone(reference);
+      }
+   }
+
    @Override
-   public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+   public void preAcknowledge(final Transaction tx, final MessageReference 
ref, final AckReason reason) throws Exception {
+      logger.trace("preAcknowledge tx={}, ref={}, reason={}", tx, ref, reason);

Review Comment:
   This one actually could use a gate check to avoid object[] allocation



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("{} rejecting preAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("Routing ack out of message {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();
+      final LinkedList<Message> messages = new LinkedList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         refs.add(ref);
+         messages.add(message);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::beforeCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::afterCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            try {
+               route(server, message);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         for (MessageReference ref : refs) {
+            ref.getMessage().usageDown();
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            context.replicationDone();
+         }
+      }
+
+   }
+
+   private static final class MirrorSendOperation extends 
TransactionOperationAbstract {
+      final List<MessageReference> mirroredRefs = new LinkedList<>();
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         for (MessageReference ref : mirroredRefs) {
+            OperationContext context = 
ref.getProtocolData(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         if (logger.isDebugEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("{} rejecting preAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("Routing ack out of message {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();
+      final LinkedList<Message> messages = new LinkedList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         refs.add(ref);
+         messages.add(message);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::beforeCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("MirrorACKOperation::afterCommit processing {}", 
messages);
+         }
+         for (Message message : messages) {
+            try {
+               route(server, message);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         for (MessageReference ref : refs) {
+            ref.getMessage().usageDown();
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         for (Message message : messages) {
+            OperationContext context = (OperationContext) 
message.getUserContext(OperationContext.class);
+            context.replicationDone();
+         }
+      }
+
+   }
+
+   private static final class MirrorSendOperation extends 
TransactionOperationAbstract {
+      final List<MessageReference> mirroredRefs = new LinkedList<>();
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         for (MessageReference ref : mirroredRefs) {
+            OperationContext context = 
ref.getProtocolData(OperationContext.class);
+            if (context != null) {
+               context.replicationLineUp();
+            }
+         }
+      }
+
+      @Override
+      public void afterRollback(Transaction tx) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("rolling back context for {} times", 
mirroredRefs.size());
+         }
+         for (MessageReference mirroredRef : mirroredRefs) {
+            OperationContext localCTX = 
mirroredRef.getProtocolData(OperationContext.class);
+            if (localCTX != null) {
+               localCTX.replicationDone();
+            }
+         }
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         if (logger.isTraceEnabled()) {

Review Comment:
   Ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -128,11 +133,17 @@ public void storeLineUp() {
    @Override
    public void replicationLineUp() {
       REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
+      if (logger.isTraceEnabled()) {

Review Comment:
   Unneeded gate check



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();
+         }
+
          refs.add(ref);
 
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {
+               logger.debug("mirror syncUp context={}, ref={}", operContext, 
ref);
+            }
+            ref.setProtocolData(OperationContext.class, operContext);
+         }
+
          if (message.isDurable() && snfQueue.isDurable()) {
             PostOfficeImpl.storeDurableReference(server.getStorageManager(), 
message, context.getTransaction(), snfQueue, true);
          }
 
+         if (tx == null) {
+            server.getStorageManager().afterStoreOperations(new IOCallback() {
+               @Override
+               public void done() {
+                  PostOfficeImpl.processReferences(refs, false);
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+               }
+            });
+         }
       } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
       }
    }
 
+   private void syncDone(MessageReference reference) {
+      OperationContext ctx = reference.getProtocolData(OperationContext.class);
+      if (ctx != null) {
+         ctx.replicationDone();
+         if (logger.isDebugEnabled()) {

Review Comment:
   Remove gate check.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();

Review Comment:
   Seems like this could be made a Collections.singletonList instead of a 
LinkedList given there is only ever going to be one entry from what I can see, 
just need to move refs.add up into the other if block



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -128,11 +133,17 @@ public void storeLineUp() {
    @Override
    public void replicationLineUp() {
       REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
+      if (logger.isTraceEnabled()) {
+         logger.trace("replicationLineUp:: {}", replicationLineUpField);
+      }
    }
 
    @Override
    public synchronized void replicationDone() {
       replicated++;
+      if (logger.isTraceEnabled()) {

Review Comment:
   Unneeded gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
       }
    }
 
+   private void postACKInternalMessage(MessageReference reference) {
+      if (logger.isDebugEnabled()) {

Review Comment:
   Unnecessary gate check



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -107,6 +115,7 @@ public boolean isStarted() {
    public AMQPMirrorControllerSource(ProtonProtocolManager 
protonProtocolManager, Queue snfQueue, ActiveMQServer server, 
AMQPMirrorBrokerConnectionElement replicaConfig,
                                      AMQPBrokerConnection brokerConnection) {
       super(server);
+      assert server != null;

Review Comment:
   Generally in a constructor I'd rather the contract be explicitly tested with 
Objects.requireNonNull(x) vs an assert 



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("===>>> send message {}", i);
+            }
+            int theI = i;
+            sendPending.countUp();
+            logger.trace("semSend.acquire");
+            semSend.acquire();
+            if (!transactional) {
+               pool.execute(() -> {
+                  try {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Entering non TX send with sendPending = 
" + sendPending.getCount());
+                     }
+                     TextMessage message = 
session.createTextMessage(bodyMessage);
+                     message.setStringProperty("strProperty", "" + theI);
+                     producer.send(message);
+                     sendPending.countDown();
+                     if (logger.isTraceEnabled()) {

Review Comment:
   Same as above.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("===>>> send message {}", i);
+            }
+            int theI = i;
+            sendPending.countUp();
+            logger.trace("semSend.acquire");
+            semSend.acquire();
+            if (!transactional) {
+               pool.execute(() -> {
+                  try {
+                     if (logger.isTraceEnabled()) {

Review Comment:
   Wouldn't need a gate check here if the trace used proper formatting.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 840411)
    Time Spent: 50m  (was: 40m)

> Mirror sync replication
> -----------------------
>
>                 Key: ARTEMIS-4136
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.28.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
>      <broker-connections>
>          <amqp-connection uri="tcp://test1:111" name="test1" 
> retry-interval="333" reconnect-attempts="33" user="testuser" 
> password="testpassword">
>             <mirror sync="true"/>
>        </amqp-connection
>    </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror 
> callback.
> With that option set, any blocking operation on the broker will wait a mirror 
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as 
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's 
> only possible to sync acks if using transactional for AMQP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to